package org.qy.rocketmq.cs.service.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.qy.rocketmq.cs.domain.entity.User;
import org.qy.rocketmq.cs.rocket.listener.ConsumeMessageListener;
import org.qy.rocketmq.cs.utils.JsonUtil;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * All rights Reserved, Designed By www.cu-sc.com
 *
 * @Title:  ConsumeMessageListenerImpl.java.java
 * @Package org.qy.rocketmq.cs.service.consumer
 * @Description: TODO
 * @Since: JDK 1.8
 * @Author: fangyukang
 * @Email: fangyk@cu-sc.com
 * @Version: v1.0.0
 * @Date: 2020/7/1 22:21
 * @Copyright: 2020 www.cu-sc.com All rights reserved. <br/>
 * 注意：本内容仅限于联通集团内部传阅，禁止外泄以及用于其他的商业目的<br/>
 */
@Slf4j
@Component
public class ConsumeMessageListenerImpl implements ConsumeMessageListener
{
    @Override
    public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msg)
    {
        try {
            if (!msg.isEmpty()) {
                msg.forEach(this::singleBusiness);
            }
        } catch (Exception e) {
            log.error("ConsumeMessageListenerImpl|dealBody|consume mq error", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    public void singleBusiness(MessageExt messageExt)
    {
        String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        log.info("ConsumeMessageListenerImpl|singleBusiness|MQ消费日志  topic:{}  tags:{}  body:{}", messageExt.getTopic(), messageExt.getTags(), body);
        switch (messageExt.getTags()) {
            /* 订单下单成功 */
            case "ORDER_PLACE_SUCCESS" :
                User parse = JsonUtil.parse(body, User.class);
                break;
            default:
                log.warn("ConsumeMessageListenerImpl|singleBusiness|MQ消费，未找到对应的消息业务处理");
        }
    }
}
